home *** CD-ROM | disk | FTP | other *** search
/ Cream of the Crop 26 / Cream of the Crop 26.iso / os2 / pvm34b3.zip / pvm34b3 / pvm3 / src / mppmsg.c < prev    next >
C/C++ Source or Header  |  1997-07-22  |  21KB  |  903 lines

  1.  
  2. static char rcsid[] =
  3.     "$Id: mppmsg.c,v 1.6 1997/06/25 22:09:13 pvmsrc Exp $";
  4.  
  5. /*
  6.  *         PVM version 3.4:  Parallel Virtual Machine System
  7.  *               University of Tennessee, Knoxville TN.
  8.  *           Oak Ridge National Laboratory, Oak Ridge TN.
  9.  *                   Emory University, Atlanta GA.
  10.  *      Authors:  J. J. Dongarra, G. E. Fagg, M. Fischer
  11.  *          G. A. Geist, J. A. Kohl, R. J. Manchek, P. Mucci,
  12.  *         P. M. Papadopoulos, S. L. Scott, and V. S. Sunderam
  13.  *                   (C) 1997 All Rights Reserved
  14.  *
  15.  *                              NOTICE
  16.  *
  17.  * Permission to use, copy, modify, and distribute this software and
  18.  * its documentation for any purpose and without fee is hereby granted
  19.  * provided that the above copyright notice appear in all copies and
  20.  * that both the copyright notice and this permission notice appear in
  21.  * supporting documentation.
  22.  *
  23.  * Neither the Institutions (Emory University, Oak Ridge National
  24.  * Laboratory, and University of Tennessee) nor the Authors make any
  25.  * representations about the suitability of this software for any
  26.  * purpose.  This software is provided ``as is'' without express or
  27.  * implied warranty.
  28.  *
  29.  * PVM version 3 was funded in part by the U.S. Department of Energy,
  30.  * the National Science Foundation and the State of Tennessee.
  31.  */
  32.  
  33. /* Define low level message functions for the MPP port */
  34.  
  35. #include <stdio.h>
  36. #include <stdlib.h>
  37. #include <sys/types.h>
  38. #if defined(NEEDSSELECTH)
  39. #include <sys/select.h>
  40. #endif
  41. #include <errno.h>
  42. #include <sys/stat.h>
  43. #include <sys/socket.h>
  44. #ifndef NOUNIXDOM
  45. #include <sys/un.h>
  46. #endif
  47. #include <netinet/in.h>
  48. #include <netinet/tcp.h>
  49. #include <pvm3.h>
  50. #include <pvmproto.h>
  51. #include "lpvm.h"
  52. #include "global.h"
  53. #include "pvmalloc.h"
  54. #include "listmac.h"
  55. #include "lmsg.h"
  56. #include "mppmsg.h"
  57. #include "mppchunk.h"
  58. #include "pvmmimd.h"
  59. #include "bfunc.h"
  60.  
  61.  
  62. /* --- external declarations */
  63.  
  64. void hex_inadport __ProtoGlarp__ (( char *, struct sockaddr_in * ));
  65.  
  66. extern int errno;
  67. extern int pvmdebmask;           /* from pvmd.c */
  68.  
  69. #if defined(IMA_NODE)
  70. extern int pvmhostnode;
  71. #endif
  72.  
  73. #if defined(IMA_PGON)
  74. #include <nx.h>
  75.  
  76. #define ASYNCSEND(_app, _tag, _buf, _len, _dest, _partid, _mid)  \
  77.         _isend((long)(_tag), _buf, (long)(_len), (long)(_dest), (long)(_partid))
  78.  
  79. #define ASYNCRECV(_app, _src, _tag, _buf, _len, _partid, _info, _mid) \
  80.         _irecvx((long)(_tag), _buf, (long)(_len), (long)(_src), \
  81.                 (long)(_partid), (long *)_info)
  82.  
  83. #define MSGDONE(_app,_mid,_flag,_status) \
  84.         _msgdone(*(_mid))
  85.  
  86. #define MSGMERGE(_mid1, _mid2) \
  87.         _msgmerge(*(_mid1), *(_mid2))
  88.  
  89. #define MSGTAG(_info) \
  90.         _info[0]
  91.  
  92. #define MSGLEN(_info, _len) \
  93.         _info[1]
  94.  
  95. #define MSGSRC(_info) \
  96.         _info[2]
  97.  
  98. /* Host (daemon) uses the same nx routines that nodes do */
  99.  
  100. #define HOSTASYNCSEND ASYNCSEND
  101. #define HOSTASYNCRECV ASYNCRECV
  102. #define HOSTMSGMERGE MSGMERGE
  103. #define HOSTMSGDONE MSGDONE
  104. #define HOSTMSGTAG MSGTAG
  105. #define HOSTMSGLEN MSGLEN
  106. #define HOSTMSGSRC MSGSRC
  107.  
  108.  
  109. #endif /* IMA_PGON */
  110.  
  111. #if defined(IMA_SP2MPI)
  112. #if defined(IMA_NODE)
  113. #include <mpi.h>
  114.  
  115. #define ASYNCSEND(_app, _tag, _buf, _len, _dest, _partid, _mid) \
  116.         MPI_Isend((void *) (_buf), len, MPI_BYTE, _dest,\
  117.                  _tag, MPI_COMM_WORLD, _mid)
  118.  
  119. #define ASYNCRECV(_app, _src, _tag, _buf, _len, _partid, _info, _mid) \
  120.         MPI_Irecv((void *) (_buf), _len, MPI_BYTE, _src, _tag,\
  121.                 MPI_COMM_WORLD, _mid)
  122.  
  123. #define MSGDONE(_app,_mid,_flag,_status) \
  124.         MPI_Test(_mid, _flag, _status )
  125.  
  126. #define MSGTAG(_info) \
  127.         (_info)->MPI_TAG
  128.  
  129. #define MSGMERGE(_mid1, _mid2) NULL 
  130.         
  131.  
  132. #define MSGLEN(_info, _len) \
  133.         MPI_Get_count(_info, MPI_BYTE, _len) 
  134.  
  135. #define MSGSRC(_info) \
  136.         (_info)->MPI_SOURCE
  137.  
  138. #else /* IMA_NODE */
  139.  
  140. #define ASYNCSEND(_app, _tag, _buf, _len, _dest, _partid, _mid) 0
  141.  
  142. #define ASYNCRECV(_app, _src, _tag, _buf, _len, _partid, _info, _mid) 0
  143.  
  144. #define MSGDONE(_app,_mid,_flag,_status) 0 
  145.  
  146. #define MSGTAG(_info) 0 
  147.  
  148. #define MSGMERGE(_mid1, _mid2) NULL 
  149.         
  150. #define MSGLEN(_info, _len) 0 
  151.  
  152. #define MSGSRC(_info) 0 
  153.  
  154. #endif /*IMA_NODE*/
  155.  
  156. /* Relay process uses the same  routines that nodes do */
  157.  
  158. #define HOSTASYNCSEND ASYNCSEND
  159. #define HOSTASYNCRECV ASYNCRECV
  160. #define HOSTMSGMERGE MSGMERGE
  161. #define HOSTMSGDONE MSGDONE
  162. #define HOSTMSGTAG MSGTAG
  163. #define HOSTMSGLEN MSGLEN
  164. #define HOSTMSGSRC MSGSRC
  165.  
  166.  
  167. #endif /* IMA_SP2MPI */
  168.  
  169.  
  170. /* ----------( Node -- Node ) Routines ----------- */
  171. static int
  172. pvm_inodesend(appid, tag, buffer, len, dest, partid, mid)
  173. int appid;
  174. int tag;
  175. char *buffer;
  176. int len;
  177. int dest;
  178. int partid;
  179. msgmid_t *mid;
  180. {
  181. #if defined(IMA_PGON)
  182.     return ((*mid) = ASYNCSEND(appid, tag, buffer, len, dest, partid, mid));
  183. #endif
  184. #if defined(IMA_SP2MPI)
  185.     return  ASYNCSEND(appid, tag, buffer, len, dest, partid, mid);
  186. #endif
  187. }
  188.  
  189.  
  190. static int
  191. pvm_inoderecv(appid, src, tag, buffer, len, partid, info, mid)
  192. int appid;
  193. int src;
  194. int tag;
  195. char *buffer;
  196. int len;
  197. int partid;
  198. int *info;
  199. msgmid_t *mid;
  200. {
  201. #if defined(IMA_PGON)
  202.     return ((*mid)=ASYNCRECV(appid, src, tag, buffer, len, partid, info, mid));
  203. #endif
  204. #if defined(IMA_SP2MPI)
  205.     return ASYNCRECV(appid, src, tag, buffer, len, partid, info, mid);
  206. #endif
  207. }
  208.  
  209.  
  210. static msgmid_t 
  211. pvm_nodemsgmerge (mid1, mid2)
  212. msgmid_t *mid1;
  213. msgmid_t *mid2;
  214. {
  215.     return (msgmid_t) MSGMERGE(mid1, mid2);
  216. }
  217.  
  218. static int
  219. pvm_nodemsgdone (appid, mid, info)
  220. int appid;
  221. msgmid_t *mid;
  222. info_t *info;
  223. {
  224. int flag;
  225. #if defined(IMA_PGON)
  226.     return MSGDONE(appid, mid, &flag, info);
  227. #endif
  228. #if defined(IMA_SP2MPI)
  229.     MSGDONE(appid, mid, &flag, info);
  230.     return flag;
  231. #endif
  232. }
  233.  
  234. static int
  235. pvm_nodemsgsrc( info )
  236. info_t *info;
  237. {
  238.     return MSGSRC(info);
  239. }
  240.  
  241. static int
  242. pvm_nodemsgtag( info )
  243. info_t *info;
  244. {
  245.     return MSGTAG(info);
  246. }
  247.  
  248. static int
  249. pvm_nodemsglen( info )
  250. info_t *info;
  251. {
  252. int len;
  253. #if defined(IMA_PGON)
  254.     return MSGLEN(info, &len);
  255. #endif
  256. #if defined(IMA_SP2MPI)
  257.     MSGLEN(info,&len);
  258.     return len;
  259. #endif
  260. }
  261.  
  262. /* ---------- ( Host <--> Node ) Routines ----------- */
  263. static int
  264. pvm_ihostsend(appid, tag, buffer, len, dest, partid, mid)
  265. int appid;
  266. int tag;
  267. char *buffer;
  268. int len;
  269. int dest;
  270. int partid;
  271. msgmid_t *mid;
  272. {
  273. #if defined(IMA_PGON)
  274.     return ((*mid) = HOSTASYNCSEND(appid, tag, buffer, len, dest, partid, mid));
  275. #endif
  276. #if defined(IMA_SP2MPI)
  277.     return  HOSTASYNCSEND(appid, tag, buffer, len, dest, partid, mid);
  278. #endif
  279. }
  280.  
  281.  
  282. static int
  283. pvm_ihostrecv(appid, src, tag, buffer, len, partid, info, mid)
  284. int appid;
  285. int src;
  286. int tag;
  287. char *buffer;
  288. int len;
  289. int partid;
  290. int *info;
  291. msgmid_t *mid;
  292. {
  293. #if defined(IMA_PGON)
  294.     return ((*mid)=HOSTASYNCRECV(appid, src, tag, buffer, len, partid, info, mid));
  295. #endif
  296. #if defined(IMA_SP2MPI)
  297.     return HOSTASYNCRECV(appid, src, tag, buffer, len, partid, info, mid);
  298. #endif
  299. }
  300.  
  301. static msgmid_t 
  302. pvm_hostmsgmerge (mid1, mid2)
  303. msgmid_t *mid1;
  304. msgmid_t *mid2;
  305. {
  306.     return HOSTMSGMERGE(mid1, mid2);
  307. }
  308.  
  309. static int
  310. pvm_hostmsgdone (appid, mid, info)
  311. int appid;
  312. msgmid_t *mid;
  313. info_t *info;
  314. {
  315. int flag;
  316. #if defined(IMA_PGON)
  317.     return HOSTMSGDONE(appid, mid, &flag, info);
  318. #endif
  319. #if defined(IMA_SP2MPI)
  320.     HOSTMSGDONE(appid, mid, &flag, info);
  321.     return flag;
  322. #endif
  323. }
  324.  
  325. static int
  326. pvm_hostmsgsrc( info )
  327. info_t *info;
  328. {
  329.     return HOSTMSGSRC(info);
  330. }
  331.  
  332. static int
  333. pvm_hostmsgtag( info )
  334. info_t *info;
  335. {
  336.     return HOSTMSGTAG(info);
  337. }
  338.  
  339. static int
  340. pvm_hostmsglen( info )
  341. info_t *info;
  342. {
  343. int len;
  344. #if defined(IMA_PGON)
  345.     return HOSTMSGLEN(info, &len);
  346. #endif
  347. #if defined(IMA_SP2MPI)
  348.     HOSTMSGLEN(info,&len);
  349.     return len;
  350. #endif
  351. }
  352.  
  353.  
  354. static MSGFUNC nodemsgfunc = { pvm_inodesend,
  355.                         pvm_inoderecv,
  356. #if defined(IMA_PGON)
  357.                         pvm_nodemsgmerge,
  358. #else
  359.                         NULL,
  360. #endif
  361.                         pvm_nodemsgdone,
  362.                         pvm_nodemsglen,
  363.                         pvm_nodemsgsrc,
  364.                         pvm_nodemsgtag };
  365.  
  366. static MSGFUNC hostmsgfunc = { pvm_ihostsend,
  367.                         pvm_ihostrecv,
  368. #if defined(IMA_PGON)
  369.                         pvm_hostmsgmerge,
  370. #else
  371.                         NULL,
  372. #endif
  373.                         pvm_hostmsgdone,
  374.                         pvm_hostmsglen,
  375.                         pvm_hostmsgsrc,
  376.                         pvm_hostmsgtag };
  377.  
  378.  
  379. /* -------- External Routines --------- */
  380. MSGFUNC_PTR 
  381. pvm_nodemsgfunc()
  382. {
  383.     return &nodemsgfunc;
  384.  
  385. MSGFUNC_PTR
  386. pvm_hostmsgfunc()
  387. {
  388.     return &hostmsgfunc;
  389. }
  390.  
  391.  
  392. /* Define some intialization functions that are only used by node procs
  393. */
  394. #if defined(IMA_NODE)
  395.  
  396. /* ------ messsage system initialization, destruction ----- */
  397. int
  398. pvm_mpp_message_init(node, partsize, host, partid)
  399. int *node;
  400. int *partsize;
  401. int *host; 
  402. int *partid;
  403. {
  404.     char errtxt[64];
  405.     char *msgbuf;
  406.     char *p;
  407.  
  408.     int ac = 0;
  409.     int msgbufsiz;
  410.  
  411.     *node = -1;            /* initialize to bogus values */
  412.     *partsize = -1;
  413.     *host = -1;
  414.     *partid = 0;
  415.  
  416. #if defined(IMA_PGON)
  417.  
  418.     if ( (*partid = _myptype()) == INVALID_PTYPE ) 
  419.     {
  420.         pvmlogerror("mpp_message_init() no process type\n");
  421.         return PvmSysErr;
  422.     }
  423.  
  424.     _setptype(0);        /* always set ptype to 0 */
  425.  
  426.     *node = _mynode();
  427.     *partsize = _numnodes();
  428.     *host = *partsize;
  429. #endif
  430.  
  431. #if defined(IMA_SP2MPI)
  432.     MPI_Init(&ac, NULL);
  433.     MPI_Comm_rank(MPI_COMM_WORLD, node);
  434.     MPI_Comm_size(MPI_COMM_WORLD, partsize);
  435.     *host = *partsize - 1;        /* host is last process in the group */
  436.  
  437. /*    if (!(p = getenv("PVMBUFSIZE")) || !(msgbufsiz = strtol(p, (char**)0, 0)))
  438.         msgbufsiz = MPIBUFSIZ;
  439.     if (!(msgbuf = malloc(msgbufsiz)))
  440.         pvmlogerror("relay() out of memory");
  441.     MPI_Buffer_attach(msgbuf, msgbufsiz);           used in psend or relay */
  442.  
  443.         if (*node == *host)    /* I'm the host, run the host proc */
  444.     {
  445.             (void)pvmhost();
  446.     }
  447.     else
  448.         
  449.  
  450. /*    MPI_Bcast(pvminfo, SIZEHINFO, MPI_INT, pvmhostnode, MPI_COMM_WORLD); */
  451. #endif
  452.  
  453.     return 0;
  454. }
  455.  
  456. int
  457. pvm_mpp_message_stop() 
  458. {
  459. #if defined(IMA_SP2MPI)
  460.     MPI_Finalize();
  461. #endif
  462.     return 0;
  463. }
  464.     
  465. #endif /* IMA_NODE */
  466. /* ===============  Host and Relay Processes for MPI =============== */
  467. #if defined(IMA_SP2MPI) && defined(IMA_NODE)
  468.  
  469. #define NMPPSBUFMIDS 32            /* number of allowed outstanding send mids */
  470. static msgmid_t mppsendmids[NMPPSBUFMIDS];
  471. static CHUNK_PTR *mppoutchunks[NMPPSBUFMIDS];
  472. static int lastindex = 0;
  473.  
  474. /* ------------ relay -------------- */
  475. /* Relay messages between pvmd and node tasks. */
  476. void relay(dsock, numnodes)
  477.     int dsock;                    /* pvmd socket */
  478.     int numnodes;                /* number of nodes in the partition */
  479. {
  480.     fd_set wrk_rfds, wrk_wfds, rfds, wfds;
  481.     int nfds;
  482.     struct timeval tout;
  483.  
  484.     struct frag *topvmdq = 0;    /* frags to pvmd */
  485.     struct frag *frtask = 0;    /* (big) frag from task */
  486.     struct frag *totaskq;        /* frag being sent to task */
  487.     struct frag *fp;
  488.     struct frag *frpvmd = (struct frag *) NULL;
  489.     char *txcp = 0;             /* point to remainder of topvmd */
  490.     int txtogo = 0;             /* len of remainder of topvmd */
  491.     int toread;                    /* number of bytes to be read from pvmd */
  492.     int frtogo;                    /* len of remainder of a fragment */
  493.     int topvmd_dst;                /* dst of fragment being sent to pvmd */ 
  494.     int topvmd_src;                /* src of fragment being sent to pvmd */
  495.     int len;
  496.     int topvmd_ff;                /* ff of fragment being sent to pvmd */
  497.     int    dst;                    /* dst of fragment being sent to node */
  498.     int    node;                    /* node number */
  499.     MPI_Request rmhd;            /* msg IDs returned by async recv */
  500.     int n;
  501.     char *cp;
  502.     int err,i;
  503.     MPI_Status sta;                /* info on pending message */
  504.     int dummy;
  505.     int flag;                    /* MPI_Test result */
  506.     struct frag *hdr;
  507.     int taskbuf = 0;            /* current buffer number to probe */ 
  508.  
  509.  
  510.  
  511.  
  512.     MPP_DIRECTI_PTR taskdirect = (MPP_DIRECTI_PTR) NULL; /* ordering structs */
  513.  
  514.     MSG_INFO_PTR taskfrags = (MSG_INFO_PTR) NULL;
  515.  
  516.     CHUNK_PTR readyFrags = (CHUNK_PTR) NULL;
  517.  
  518.     MSGFUNC_PTR mfunc;
  519.     
  520.     struct frag *cftd = (struct frag *) NULL; /* cur.frag going to daemon */
  521.  
  522.     struct frag *cftt = (struct frag *) NULL; /* cur. frag to task */
  523.  
  524.     int cftd_togo;            /* len left to go to daemon */
  525.  
  526.     int frg_complete;        /* flag  if outgoing frag is complete */
  527.  
  528.  
  529. /* This is the relay process for MPI ports. 
  530.  *
  531.  * It should be called in pvm_mpp_beatask 
  532.  * after pvm_mpp_message_init, so none of the pre-allocated buffers have
  533.  * been allocated.
  534.  *
  535.  * It is basically a fragment forwarder
  536.  * pvmd -> task
  537.    - each fragment is read from the pvmd socket and forwarded to the
  538.      tasks.
  539.    task -> pvmd
  540.    - each fragment is read from a task and forwarded onto the pvmd socket.
  541.  
  542. */
  543.  
  544.     /* Initialization */
  545.  
  546.     mfunc = pvm_hostmsgfunc();
  547.  
  548.     nfds = dsock + 1;
  549.  
  550.     /* set up pre-allocated receive buffers to/from tasks */
  551.  
  552.     taskdirect = new_vdirectstruct(numnodes, NSBUFS, NRBUFS);
  553.  
  554.     taskfrags = init_recv_list(NSBUFS, PMTDBASE, MAXFRAGSIZE, 0, MPPANY,
  555.                     pvm_hostmsgfunc());
  556.  
  557.     pvmlogprintf("relay() numnodes is %d\n", numnodes);
  558.      /* initialize the packet numbering for packets to/from peers */    
  559.     for (i = 0; i < numnodes; i ++)
  560.     {
  561.         fill_directstruct (taskdirect + i, NRBUFS, i,
  562.             0, PMTDBASE, 0, MPPANY);
  563.         init_chunkostruct( (taskdirect+i)->ordering, NSBUFS);
  564.     }
  565.  
  566.     /* initialize the fragment queues */
  567.  
  568.      if( !(topvmdq = TALLOC(1, struct frag, "topvmdq")) 
  569.         || !(totaskq = TALLOC(1, struct frag, "totaskq")) )
  570.     {
  571.         pvmlogerror("relay() could not init frag queues\n");    
  572.         pvm_mpp_message_stop();
  573.         exit(PvmOutOfRes);
  574.     }    
  575.     BZERO((char *) topvmdq, sizeof (struct frag));
  576.     BZERO((char *) totaskq, sizeof (struct frag));
  577.  
  578.     topvmdq->fr_link = topvmdq->fr_rlink = topvmdq;
  579.     totaskq->fr_link = totaskq->fr_rlink = totaskq;
  580.  
  581.  
  582.     /* initialize the asynchronous send structures */    
  583.  
  584.     pvm_init_asynch_list(mppsendmids, mppoutchunks, NMPPSBUFMIDS);
  585.  
  586.     FD_ZERO(&wrk_rfds);        /* zero the file set descriptor for select */ 
  587.     FD_ZERO(&wrk_wfds);
  588.  
  589.     tout.tv_sec = tout.tv_usec = 0; /* don't block in select */
  590.     while (1) 
  591.     {            
  592.         /* Step 1) get any frags from tasks */
  593.  
  594.         while ( fp = (struct frag *) pvm_chunkReady(taskfrags, NRBUFS,
  595.                 mfunc, taskdirect, numnodes, &taskbuf, &readyFrags) )     
  596.         {
  597.             LISTPUTBEFORE(topvmdq, fp, fr_link, fr_rlink);
  598.             if (pvmdebmask & PDMNODE)
  599.             pvmlogprintf("Frag from task %d len %d \n", fp->fr_src, fp->fr_len);
  600.         }
  601.             
  602.  
  603.         /* If we don't have a frag to go to pvmd, check if one has been read */
  604.         if (!cftd && topvmdq->fr_link != topvmdq )
  605.         {
  606.             cftd = topvmdq->fr_link;
  607.             LISTDELETE(cftd, fr_link, fr_rlink);
  608.             cftd_togo = cftd->fr_len;
  609.             txcp = cftd->fr_dat;
  610.             if (pvmdebmask & PDMNODE)
  611.             pvmlogprintf("Frag to daemoen %d len %d \n", cftd->fr_src, cftd->fr_len);
  612.  
  613.         }
  614.  
  615.         /* There is a frag that needs to go to the daemon ? */
  616.         if (cftd)        
  617.         {
  618.             FD_SET(dsock, &wrk_wfds);
  619.             if (pvmdebmask & PDMNODE)
  620.             pvmlogprintf("Setting dsock in the write fds \n");
  621.         }
  622.  
  623.         FD_SET(dsock, &wrk_rfds); /* need to probe the socket for reading */
  624.  
  625.         rfds = wrk_rfds;
  626.         wfds = wrk_wfds;
  627.  
  628.         if (select(nfds, &rfds, &wfds, (fd_set*)0, &tout) == -1 
  629.         && errno != EINTR) {
  630.             pvmlogperror("relay() select");
  631.             pvm_exit();
  632.             pvm_mpp_message_stop();
  633.             exit(-1);
  634.         }
  635.  
  636.         /* we've got something read  from pvmd, if the frag is 
  637.             complete, then put it on the totaskq  */
  638.         if (FD_ISSET(dsock, &rfds)) 
  639.         {
  640.             frg_complete = 1; /* initially mark the frag as completely read */
  641.  
  642.             if (!frpvmd) {            /* allocate a data frag */
  643.                 frpvmd = fr_new(MAXFRAGSIZE);
  644.                 toread = TDFRAGHDR;
  645.             }
  646.  
  647.             n = read(dsock, frpvmd->fr_dat + frpvmd->fr_len, toread);
  648.  
  649.             if (n == -1 && errno != EWOULDBLOCK && errno != EINTR) {
  650.                 pvmlogperror("relay() read pvmd sock");
  651.                 pvm_mpp_message_stop();
  652.                 exit(12);
  653.             }
  654.             if (!n) {
  655.                 /* pvmlogerror("relay() read EOF on pvmd sock\n"); */
  656.                 pvm_mpp_message_stop();
  657.                 exit(13);
  658.             }
  659.  
  660.             if ((frpvmd->fr_len += n) < TDFRAGHDR) {
  661.                 toread -= n;
  662.                  frg_complete = 0;    
  663.             }
  664.  
  665.             
  666.             n = TDFRAGHDR + pvmget32(frpvmd->fr_dat + 8);   /* get the length */
  667.  
  668.             if (frpvmd->fr_len < n) {
  669.                 if (frpvmd->fr_max < n) {                    /* n > MAXFRAGSIZ */
  670.                     hdr = frpvmd;
  671.                     frpvmd = fr_new(n);
  672.                     BCOPY(hdr->fr_dat, frpvmd->fr_dat, hdr->fr_len);
  673.                     frpvmd->fr_len = hdr->fr_len;
  674.                     fr_unref(hdr);
  675.                 }
  676.                 toread = n - frpvmd->fr_len;
  677.                 frg_complete = 0;
  678.             }
  679.  
  680.             if (pvmdebmask & PDMNODE)
  681.             pvmlogprintf("reading from daemon len %d frg_complete %d \n", frpvmd->fr_len, frg_complete);
  682.             if (frg_complete)
  683.             {    
  684.                 dst = pvmget32(frpvmd->fr_dat);
  685.  
  686.                 frpvmd -> fr_src = dst; /* overload the src field with the dst */
  687.         
  688.                 LISTPUTAFTER(totaskq, frpvmd, fr_link, fr_rlink); /* queue it */
  689.  
  690.             if (pvmdebmask & PDMNODE)
  691.             pvmlogprintf("frg_complete dest %x len %d\n", frpvmd->fr_src, frpvmd->fr_len);
  692.                 frpvmd = (struct frag *) NULL;
  693.             }
  694.  
  695.         }
  696.  
  697.  
  698.         if (!cftt && totaskq -> fr_link != totaskq )
  699.         {
  700.             cftt = totaskq->fr_link;
  701.             LISTDELETE(cftt, fr_link, fr_rlink);
  702.             if (pvmdebmask & PDMNODE)
  703.             pvmlogprintf("New frag to task %x  len %d \n",
  704.                 cftt->fr_src, cftt->fr_len);
  705.         }
  706.  
  707.  
  708.         /* Let's see if we something to send to a task  */
  709.         if (cftt)
  710.         {
  711.             if (send_to_node(cftt,  taskdirect, numnodes) >= cftt->fr_len)
  712.                 cftt = (struct frag *) NULL;
  713.             else
  714.             /* if for some reason send_to_node could not send this frag,
  715.                the next time through the loop, the frag will be resent.
  716.             */;
  717.         }
  718.  
  719.         /* Now let's see if we have something to write to the pvmd */
  720.  
  721.         if ( cftd && FD_ISSET(dsock, &wfds))
  722.         {
  723.             if (pvmdebmask & PDMNODE)
  724.             pvmlogprintf("Writing packet to daemon %d\n", cftd->fr_len);
  725.             n = write(dsock, txcp, cftd_togo);
  726.  
  727.             if (n == -1 && errno != EWOULDBLOCK && errno != EINTR) {
  728.                 pvmlogperror("relay() write pvmd sock");
  729.                 pvm_mpp_message_stop();
  730.                 exit(14);
  731.             }
  732.             if (n > 0 && (cftd_togo -= n) > 0) 
  733.                 txcp += n;
  734.             if (!cftd_togo) {        /* entire message sent */
  735.                 FD_CLR(dsock, &wrk_wfds);
  736.                 fr_unref(cftd);
  737.                 cftd = (struct frag *) NULL;
  738.             }
  739.         }
  740.         else
  741.         {
  742.             if (cftd)
  743.             if (pvmdebmask & PDMNODE)
  744.                 pvmlogprintf("could not write to daemon \n");
  745.         }
  746.     }
  747. }
  748.  
  749. /* --------- send_to_node() ------------ */
  750. /* return the number of bytes sent. This posts sends asynchronously.
  751.    A fixed number of asynch sends are allowed to be outstanding.
  752.    If the relay asks for more than this number in sends. send_to_node
  753.    will return 0.
  754. */
  755.  
  756. send_to_node(fp,  taskdirect, numnodes )
  757. struct frag *fp; 
  758. MPP_DIRECTI_PTR taskdirect;
  759. int numnodes;
  760. {
  761.     int appid = 0;
  762.     int cc;
  763.     int idx;
  764.     int len;
  765.     int node;
  766.     int ptype;
  767.     int tag;
  768.     msgmid_t mid;
  769.  
  770.  
  771.     MPP_DIRECTI_PTR tdirect;
  772.  
  773.     MSGFUNC_PTR mfunc;
  774.  
  775.     mfunc = pvm_hostmsgfunc();
  776.  
  777.     if ( (idx = pvm_mpp_find_midx(mppsendmids, mppoutchunks, &lastindex, 
  778.                 NMPPSBUFMIDS, mfunc)) >= 0)
  779.     {
  780.         node = fp->fr_src & TIDNODE;
  781.         len = fp->fr_len;
  782.         ptype = (fp->fr_src & TIDPTYPE) >> (ffs(TIDPTYPE) - 1);
  783.  
  784.         tdirect = taskdirect + node;
  785.  
  786.         tag = tdirect->tagbase + tdirect->sseq;
  787.  
  788.         if ( ++(tdirect->sseq) >= tdirect->nbufs)
  789.             tdirect->sseq = 0;
  790.  
  791.             
  792.             if ( (cc  = (*mfunc->imsgsend)(appid, tag,
  793.                 fp->fr_dat, len, node, ptype, &mid)) < 0 )
  794.          {
  795.             pvmlogprintf("Relay -- async send failed!  (%d) \n", cc);
  796.         }
  797.         else /* send was ok. */
  798.         {
  799.             pvm_assign_mid(mppsendmids, mid, idx);
  800.             pvm_assign_chunk(mppoutchunks, (CHUNK_PTR) fp, idx);
  801.         }
  802.  
  803.         return (len);
  804.  
  805.     }
  806.     else
  807.         return 0;    /* could not send, no free low-level message ids */
  808. }
  809.     
  810.  
  811. /* --------- pvmhost() ------------ */
  812. /* We're the "host" process. Connect to pvmd. */
  813. int 
  814. pvmhost()
  815. {
  816.     char *p;
  817.     int dsock;                    /* pvmd socket */
  818.     struct sockaddr_in dsadr;    /* address of pvmd socket */
  819.     int n;
  820.     int i;
  821.     int cc;
  822.     int err;
  823.     
  824.     int pvminfo[SIZEHINFO];        /* ntask, hostpart, ptid, MTU, NDF */
  825.     char nullmsg[TDFRAGHDR+MSGHDRLEN];
  826.     int appid = 0;                /* Would be used for PGONPUMA */
  827.  
  828.     MSGFUNC_PTR mfunc;
  829.  
  830.     msgmid_t confmid;
  831.  
  832.     info_t confinfo;
  833.  
  834.     if (!(p = getenv("PVMSOCK"))) {
  835.         pvmlogerror("pvmhost() getenv() pvmd socket\n");
  836.         pvm_mpp_message_stop();
  837.         exit(2);
  838.     }
  839.     if ((dsock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
  840.         pvmlogperror("pvmhost() socket");
  841.         pvm_mpp_message_stop();
  842.         exit(3);
  843.     }
  844.     BZERO((char*)&dsadr, sizeof(dsadr));
  845.     hex_inadport(p, &dsadr);
  846.     dsadr.sin_family = AF_INET;
  847.     n = sizeof(dsadr);
  848.     while (connect(dsock, (struct sockaddr*)&dsadr, n) == -1)
  849.         if (errno != EINTR) {
  850.             pvmlogperror("pvmhost() connect");
  851.             pvm_mpp_message_stop();
  852.             exit(4);
  853.         }
  854. #ifndef NOSOCKOPT
  855.     n = 1;
  856.     if (setsockopt(dsock, IPPROTO_TCP, TCP_NODELAY, (char*)&n, sizeof(int))
  857.     == -1) {
  858.         pvmlogperror("pvmhost() setsockopt");
  859.         pvm_mpp_message_stop();
  860.         exit(5);
  861.     }
  862. #endif
  863.     if (!(p = getenv("PVMEPID"))) {
  864.         pvmlogerror("pvmhost() getenv() pid\n");
  865.         pvm_mpp_message_stop();
  866.         exit(6);
  867.     }
  868.     pvmmyupid = atoi(p);
  869.     BZERO(nullmsg, MAXHDR);
  870.     pvmput32(nullmsg, TIDPVMD);
  871.     pvmput32(nullmsg + 4, pvmmyupid);
  872.     pvmput32(nullmsg + 8, MSGHDRLEN);
  873.     pvmput32(nullmsg + 12, 0);            /* to keep putrify happy */
  874.     pvmput8(nullmsg + 12, FFSOM|FFEOM);
  875.     if (write(dsock, nullmsg, TDFRAGHDR+MSGHDRLEN) != TDFRAGHDR+MSGHDRLEN
  876.     || read(dsock, pvminfo, SIZEHINFO*sizeof(int)) != SIZEHINFO*sizeof(int)) {
  877.         pvmlogperror("pvmhost() write/read");
  878.         pvm_mpp_message_stop();
  879.         exit(8);
  880.     }
  881.     /* Multicast the configuration message */  
  882.     mfunc = pvm_hostmsgfunc();
  883.     pvmlogprintf("pvmhost() --  Starting configMessage send Loop %d\n",
  884.         pvmhostnode);
  885.     for (i = 0; i < pvmhostnode; i ++ )
  886.     {
  887.         if ( (*mfunc->imsgsend)(appid, PMTCONF,  (char *) pvminfo, 
  888.                 sizeof(pvminfo), i, PVMDPTYPE, &confmid) < 0) 
  889.         {
  890.             pvmlogperror("pvmhost() configuration message \n");
  891.             err = PvmDSysErr;
  892.             pvm_mpp_message_stop();
  893.             exit(9);
  894.         }
  895.         while (! ((*mfunc->msgdone)(appid, &confmid, &confinfo)));
  896.     }
  897.     pvmlogerror("pvmhost() -- Finished Message send Loop\n");
  898.     (void)relay(dsock, pvmhostnode + 1);
  899. }
  900.  
  901. #endif /*IMA_SP2MPI*/
  902.